import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statistics import mean
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import plotly.io as pio
# Use pio to ensure plotly plots render in Quarto
pio.renderers.default = 'notebook_connected'Handmade Machine Learning Algorithms
Below is a collection on Machine Learning algorithms coded from scratch. This project is an exploration of how common machine learning models work under the hood. By implementing these algorithms from scratch in Python, I aim to deepen my understanding of the mathematics power these techniques, while also visualizing their pros, cons, and differences!
Featured Algorithms:
- Linear Regression: Understanding the basics of predictive modeling - both through ordinary least squares (OLS), and gradient descent (GD).
- K-Nearest Neighbors (KNN): A simple yet non-linear approach.
- Decision Trees: Learning interpretable models for classification and regression tasks.
- Random Forest: A foray into ensemble learning.
- Gradient Boosting: A robust ensemble method for tackling complex problems.
Feel free to explore the code!
Intro Code
Imports
Function to Plot Training Data and Regression Plane
Code
import plotly.graph_objects as go
def plot_3d_regression(model, model_name, data_type = 'linear'):
# Make Standard Data
X, y = make_regression(
n_samples=300, # Number of samples
n_features=2, # Number of features
noise=15, # Add noise to make it realistic
random_state=42 # Set seed for reproducibility
)
# Make standard non-linear data
if data_type.lower() == 'non-linear':
y = y + 500 * np.sin(X[:, 0]) + 250 * np.cos(X[:, 1])
# Make non-standard data
if data_type == 'ugly':
# Add non-linear transformations
y = y + 500 * np.sin(X[:, 0]) * np.cos(X[:, 1]) \
+ 300 * (X[:, 0] ** 2) \
- 200 * np.exp(-X[:, 1] ** 2)
# Add random feature interactions
y = y + 100 * (X[:, 0] * X[:, 1])
# Add random noise to make it "uglier"
y = y + np.random.normal(0, 150, size=y.shape)
model.fit(X, y)
# Create a mesh grid for the features
x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100),
np.linspace(min(X[:, 1]), max(X[:, 1]), 100))
grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T
predictions = model.predict(grid)
# Create 3D scatter plot for training data
scatter = go.Scatter3d(
x=X[:, 0], y=X[:, 1], z=y,
mode='markers', marker=dict(color='blue', size=5), name='Training Data'
)
# Create 3D surface plot for the regression surface
surface = go.Surface(
x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface'
)
# Combine both traces into one figure
fig = go.Figure(data=[scatter, surface])
# Update layout for better visualization
fig.update_layout(
title=f'Training Data and Regression Surface for {model_name}',
scene=dict(
xaxis_title='Feature 1',
yaxis_title='Feature 2',
zaxis_title='Target'
)
)
# Show plot
fig.show()Regression Algorithms
OLS Regression
class ols_regression():
# Initialize the class
def __init__(self):
pass
def fit(self, X, y):
'''Fit the regression to the X data via the OLS equation'''
# Add a leading colums of 1s to the X data to account for the bias term
X = np.hstack((np.ones((X.shape[0], 1)), X))
# Train the data on (X.T @ X)^(-1) @ X.T @ y
ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))
self.coef = ols[1:]
self.bias = ols[0]
def predict(self, X):
'''Predict new data with the trained coefficients and bias'''
# Check if the X data is 1D and reshape if needed
if X.ndim == 1:
X = X.reshape(-1, 1)
# Make predictions by dotting the new data with the coefficients and adding the bias
self.predictions = X.dot(self.coef) + self.bias
return self.predictionsFor each algorithm, we build two practice datasets on which we fit the algorithm. These are a linear dataset and an ‘ugly’ dataset that is non-linear and contains significant noise. We then plot the training data and a projection grid for the trained algorithm to understand how it makes predictions and showcase it’s ability to ‘understand’ patterns in the underlying data.
ols = ols_regression()
plot_3d_regression(ols, model_name='OLS', data_type='linear')
plot_3d_regression(ols, model_name='OLS', data_type='ugly')
While we can see that the OLS regression fits the linear dataset quite well, there is significant error when fitting to the non-linear data. As we can see in the prediction grid, this is because the regression only operates in a linear fashion, which does match the structure of the training data.
Gradient Descent Regression
Next, we build a linear regression via gradient descent. Unlike OLS which is a closed form solution, gradient descent operates by calculating the gradient of the loss function (MSE) for any given set of regression weights, and systematically changing the weights in order to ‘move down’ the gradient and minimize the loss of predictions.
While the final result should be the same as in OLS, they do not always align due to some randomness in attempting to find the absolute minimum of the loss function (although predictions are generally close enough). That said, Gradient Descent is a good option for larger datasets due to its lower computational requirements.
class GDRegression():
'''Simple class for performing linear regression via gradient descent. Note: this is a simple execution and does not include options for
regularization'''
def __init__(self, epochs, eta):
'''Initialize the Gradient Descent Regression Class'''
self.epochs = epochs
self.eta = eta
def fit(self, X, y, batch_size = "TBD"):
'''Train the Gradient Descent Regression Class'''
if batch_size == 'TBD':
batch_size = X.shape[0]
# Create random initialization for the bias and coefficients
bias = np.random.random()
coef = np.random.random(X.shape[1])
# Iterate through each epoch
for iter in range(self.epochs):
indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False)
X_batch = X[indices]
y_batch = y[indices]
# Make predictions for the X data being trained on
y_hat = X_batch.dot(coef) + bias
# Calculate the derrivative WRT bias and coef given the predicions
derr_b = 2/X_batch.shape[0] * sum((y_hat - y_batch))
derr_c = 2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)
# Update the bias and the coef based on the derrivative
bias = bias - derr_b * self.eta
coef = coef - derr_c * self.eta
# Finalize the bias and coef
self.bias = bias
self.coef = coef
def predict(self, X):
'''Predict new data given the learned bias and coef'''
predictions = X.dot(self.coef) + self.bias
return predictions
gd_reg = GDRegression(epochs=10000, eta=.01)
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='linear')
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='ugly')As expected, the results for the gradient descent regression are effectively the same as for the OLS! The prediction grid is fit very well to the linear dataset, but fails to understanad the non-linear nature of the ‘ugly’ data.
KNN Regression
We next build a K-Nearest Neighbors regressor. K-Nearest Neighbors works by taking the K closest (physically) points to any specific point in question, and predicting the average of those K points as the output. This works with the assumption that point close together are similar in nature.
class KNNRegressor():
'''A simple class for performing Nearest-Neighbors Regression. Note: this is a simple execution, and does not include options for
changes in distance calculation metrics beyond Euclidian distance.'''
def __init__(self, n_neighbors=5):
'''Initialize the regressor with a defined number of nearest neighbors'''
self.n_neighbors = n_neighbors
def fit(self, X, y):
'''Train the regressor by loading in all X and y data'''
self.X = X
self.y = y
def predict(self, X):
'''Make predictions based on the training data using euclidian distance'''
predictions = np.empty(0)
# For each test point...
for test_point in X:
# Calculate the distance between the test point and all training points
distances = np.linalg.norm(self.X - test_point, axis=1)
# Find the n_neighbors closest points
closest_points_indices = np.argsort(distances)[:self.n_neighbors]
# Use the mean of the closest points to formulate a predictions and append to the predictions array
prediction = mean(self.y[closest_points_indices])
predictions = np.append(predictions, prediction)
return predictionsknn_regressor = KNNRegressor()
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='linear')
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='ugly')We first see that unilke in the linear regression, our prediction grids are now bumpy! This is because each prediction is dependant on the K nearest points, which introduces randomness into the prediction grid. The next big change vs. the previous linear regression is that the KNN regression is able to fit the non-linear data much better. However, we do still see some issues in fitting some of the very noisy data points well, as they fail to fit the underlying assumption that points close together are similar.
Decision Tree Regressor
We next build a decision tree regressor. Decision Tree Regression splits the data into subsets based on feature values to create a tree-like model of decisions. At each split, the algorithm chooses the feature and threshold that minimize the error in predicting the target variable.
The model predicts the target value of a new data point by following the tree’s splits until reaching the bottom of the tree , where the output is the average of the target values in that leaf.
class DecisionTreeRegressor():
'''A class to build a decision tree regressor. Note this is a simple executuion and does not include common parameters for regularization
other than max_depth'''
def __init__(self, max_depth=None):
# Initializes the decision tree regressor with an optional maximum depth for the tree.
self.max_depth = max_depth
# Function for calculating the Mean Squared Error (MSE) of a split
def mse(self, y):
# MSE is calculated as the average of squared differences from the mean
return np.mean((y - np.mean(y)) ** 2)
# Function to find the best split at any given point (based on MSE)
def _best_split(self, X, y):
# Initialize variables to track the best split found
self.best_mse = float('inf') # Best MSE starts as infinity
self.best_feature = None
self.best_split_value = None
self.best_left_y = None
self.best_right_y = None
# Loop over each feature in the dataset
for feature_num in range(X.shape[1]):
# Get unique values in the feature column to test potential splits
feature_values = np.unique(X[:, feature_num])
# For each unique value, try splitting the data
for value in feature_values:
# Find indices where the feature values are less than or equal to the split value
left_index = X[:, feature_num] <= value
right_index = X[:, feature_num] > value
# Split the target values accordingly
left_targets = y[left_index]
right_targets = y[right_index]
# Proceed only if both splits result in non-empty groups
if len(left_targets) > 0 and len(right_targets) > 0:
# Compute MSE for both the left and right splits
left_mse = self.mse(left_targets)
right_mse = self.mse(right_targets)
# Calculate the weighted average MSE of the two splits
total_average_mse = left_mse * len(left_targets)/len(y) + right_mse * len(right_targets)/len(y)
# If this split provides a better (lower) MSE, update the best split found
if total_average_mse < self.best_mse:
self.best_mse = total_average_mse
self.best_feature = feature_num
self.best_split_value = value
self.left_y = left_targets
self.right_y = right_targets
# Return the best split information (feature index, value, and target splits)
return self.best_split_value, self.best_feature, self.left_y, self.right_y
# Function to recursively build the decision tree
def _build_tree(self, X, y, depth=0):
# Base case: If all targets are the same or max depth is reached, return the mean of the target values
if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
return np.mean(y)
# Find the best split for the data
best_split_value, best_feature, left_y, right_y = self._best_split(X, y)
# If no valid split was found, return the mean of the targets
if best_feature is None:
return np.mean(y)
# Split the data based on the best feature and split value
left_index = X[:, best_feature] <= best_split_value
right_index = X[:, best_feature] > best_split_value
# Recursively build the left and right subtrees
left_tree = self._build_tree(X[left_index], left_y, depth + 1)
right_tree = self._build_tree(X[right_index], right_y, depth + 1)
# Return the current node as a dictionary with the best split and its left and right subtrees
return {
'feature': best_feature,
'value': best_split_value,
'left': left_tree,
'right': right_tree
}
# Function to make a prediction for a single sample using the tree
def _single_prediction(self, tree, x):
# If the current tree node is a dictionary (not a leaf), recursively traverse the tree
if isinstance(tree, dict):
if x[tree['feature']] < tree['value']:
return self._single_prediction(tree['left'], x) # Go left
else:
return self._single_prediction(tree['right'], x) # Go right
# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
else:
return tree
# Function to predict target values for a set of samples
def predict(self, X):
# For each sample in X, make a prediction by recursively traversing the tree
predictions = np.array([self._single_prediction(self.tree, x) for x in X])
return predictions
# Function to fit the decision tree to the training data
def fit(self, X, y):
# Build the tree by calling the recursive function with the training data
self.tree = self._build_tree(X, y)dt_reg = DecisionTreeRegressor()
plot_3d_regression(dt_reg, "Decision Tree", data_type='linear')
plot_3d_regression(dt_reg, "Decision Tree", data_type='ugly')With the Decision Tree regressor, similar to the KNN regressor, we are able to fit well to both the linear and non-linear datasets. The regression surface does however look a bit different, as there are sharper horozontal cuts rather than an overall bumpy surface. This is caused by the ‘cutting’ of the dataset on feature values during training, which segments the data sharply.
Random Forest Regression
We next build a Random Forest Regressor. Random Forest Regression improves upon Decision Trees by combining the predictions of multiple trees to make a more accurate and stable model. Each tree is trained on a random subset of the data and features, and the final prediction is the average of all the trees’ outputs.
class RandomForestRegressor():
def __init__(self, n_estimators, max_depth=None):
'''A class to build a Random Forest Regressor. Note this is a simple execution and does not include parameters for regularization
other than n_estimators and max_depth'''
# Initializes the random forest regressor with the number of estimators (trees) and an optional maximum depth for each tree.
self.n_estimators = n_estimators
self.max_depth = max_depth
# Function for calculating the Mean Squared Error (MSE) of a split
def mse(self, y):
# MSE is calculated as the average of squared differences from the mean
return np.mean((y - np.mean(y)) ** 2)
# Function to find the best split at any given point (based on MSE)
def _best_split(self, X, y):
# Initialize variables to track the best split found
self.best_mse = float('inf') # Best MSE starts as infinity
self.best_feature = None
self.best_split_value = None
self.best_left_y = None
self.best_right_y = None
# Randomly sample 1/3 of the total features to consider for splitting
n_features_to_consider = max(1, X.shape[1] // 3)
random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)
# Only consider the randomly selected features for splitting
feature_subset = X[:, random_feature_indices]
# Loop through the randomly selected features and find the best split
for i, feature_num in enumerate(random_feature_indices): # Map back to original feature indices
feature_values = np.unique(feature_subset[:, i])
for value in feature_values:
# Boolean indices based on the feature subset
left_index = feature_subset[:, i] <= value
right_index = feature_subset[:, i] > value
# Subset targets using these indices
left_targets = y[left_index]
right_targets = y[right_index]
# Ensure both sides have samples
if len(left_targets) > 0 and len(right_targets) > 0:
# Calculate the MSE for both the left and right subsets
left_mse = self.mse(left_targets)
right_mse = self.mse(right_targets)
total_average_mse = left_mse * len(left_targets) / len(y) + right_mse * len(right_targets) / len(y)
# If this split results in a better (lower) MSE, update the best split
if total_average_mse < self.best_mse:
self.best_mse = total_average_mse
self.best_feature = feature_num
self.best_split_value = value
self.left_y = left_targets
self.right_y = right_targets
# Return the best split information (feature index, value, and target splits)
return self.best_split_value, self.best_feature, self.left_y, self.right_y
# Function to recursively build the decision tree
def _build_tree(self, X, y, depth=0):
# Base case: If all targets are the same or max depth is reached, return the mean of the target values
if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
return np.mean(y)
# Find the best split for the data
best_split_value, best_feature, left_y, right_y = self._best_split(X, y)
# If no valid split was found, return the mean of the targets
if best_feature is None:
return np.mean(y)
# Split the data based on the best feature and split value
left_index = X[:, best_feature] <= best_split_value
right_index = X[:, best_feature] > best_split_value
# Recursively build the left and right subtrees
left_tree = self._build_tree(X[left_index], left_y, depth + 1)
right_tree = self._build_tree(X[right_index], right_y, depth + 1)
# Return the current node as a dictionary with the best split and its left and right subtrees
return {
'feature': best_feature,
'value': best_split_value,
'left': left_tree,
'right': right_tree
}
# Function to make a prediction for a single sample using the tree
def _single_prediction(self, tree, x):
# If the current tree node is a dictionary (not a leaf), recursively traverse the tree
if isinstance(tree, dict):
if x[tree['feature']] < tree['value']:
return self._single_prediction(tree['left'], x) # Go left
else:
return self._single_prediction(tree['right'], x) # Go right
# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
else:
return tree
# Function to predict target values for a set of samples
def predict(self, X):
predictions = []
# For each tree in the random forest, make predictions for all samples in X
for tree in self.trees:
model_predictions = np.array([self._single_prediction(tree, x) for x in X])
predictions.append(model_predictions)
# Combine all the predictions of all the trees and average across the trees
all_predictions = np.column_stack(predictions)
all_predictions = np.mean(all_predictions, axis=1)
return all_predictions
# Function to train the random forest model
def fit(self, X, y):
models = []
# Create n decision trees, each trained with bootstrapping (sampling with replacement)
for n in range(self.n_estimators):
random_row_indices = np.random.choice(len(X), len(X), replace=True)
subset_X = X[random_row_indices]
subset_y = y[random_row_indices]
tree = self._build_tree(subset_X, subset_y)
# Add each trained tree to the list of models
models.append(tree)
# Save all the trained trees
self.trees = modelsrf_regressor = RandomForestRegressor(10)
plot_3d_regression(rf_regressor, "Random Forest Regressor", "linear")
plot_3d_regression(rf_regressor, "Random Forest Regressor", "ugly")As expected, the Random Forest regressor fits both datasets quite well, and has a regression surface that is similar to the Decision Tree, although a bit smoother. Random Forest regression is likely to grow its improvements over Decision Trees as the underlying dataset grows larger and more complex.
Gradient Boosting Regression
For our final regression method, we build a Gradient Boost Regressor. Gradient Boosting Regression builds a model by combining many small and poor decision trees, each one correcting the errors of the previous trees. It works iteratively, adding trees that focus on reducing the residual errors in the predictions. The final prediction is the sum of all the trees’ outputs.
class GradientBoostingRegressor:
def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
'''A class for building a Gradient Boosting Regressor. Note this is a simple execution and does not include parameters for
regularization other than n_estimators, learning_rate, and max_depth'''
# Initialize model with given number of estimators, learning rate, and max depth for each tree
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
def fit(self, X, y):
# Initialize predictions as the mean of the target values
self.y_pred = np.full_like(y, np.mean(y), dtype=np.float32)
self.trees = []
for _ in range(self.n_estimators):
# Calculate residuals
residuals = y - self.y_pred
# Fit a tree to the residuals
tree = DecisionTreeRegressor(max_depth=self.max_depth)
tree.fit(X, residuals)
# Update predictions with the tree's output scaled by learning rate
self.y_pred += self.learning_rate * tree.predict(X)
self.trees.append(tree)
def predict(self, X):
# Start with initial predictions as the mean of target values
y_pred = np.full(X.shape[0], np.mean(self.y_pred), dtype=np.float32)
# Sum the residual predictions from all trees
for tree in self.trees:
y_pred += self.learning_rate * tree.predict(X)
return y_predboost_regressor = GradientBoostingRegressor(n_estimators=50)
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "linear")
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "ugly")We once again see a very strong fit to the training data both in the linear and non-linear cases from the Gradient Boosting Regressor. Additionally, as compared to previous non-linear methods, the surface is a fair bit smoother. There are a few large, sharp cuts that represent overfitting, but these can be combatted with regularization techniques not included in this simple execution!